查看原文
其他

yyds!用深度学习框架玩明日方舟,高端!

喜欢就关注我们吧!

每个游戏玩家都有一个梦,希望自己在虚拟世界中成为万众瞩目、无所不能的英雄。然后…然后…闹钟响了梦醒了,又到了挤地铁上班的时间。


不过,在这个项目中,我将带大家暂时忘却现实的烦恼,用飞桨深度强化学习框架PARL来实现这个“英雄梦”!先放效果图:



知识回顾



大家是不是迫不及待了呢?且慢,要实现《明日方舟》游戏的深度强化学习,还是先让我带大家回顾一下深度强化学习算法历史。DQN是深度强化学习算法开山之作,在经典街机游戏上取得了非常好的效果。它使用了ReplyMemory来存储和回放经验,这是Off-policy类型算法的常用技巧。但是,DQN在应对手机游戏时,能力就不够看了。于是我把目光投向了更为强大的算法--- A3C。

A3C算法与DQN不同,它设计了异步多线程的Actor-Critic,每个Agent在自己的线程中运行,然后全局共享学习到的网络参数。这样,每时每刻都能有大量的交互数据,并且这些多线程采集到的数据没有关联性(关联性问题:请参考DDQN算法原理)。因此,A3C算法通过“异步多线程+共享全局参数”达到了和ReplyMemory类似的效果。而且,它既有大量数据可以解决训练过程不稳定问题,同时又解决了参数关联性的问题。

在经典算法PG中,我们的Agent又被称为Actor,Actor对于一个特定的任务,都有自己的一个策略π。策略π通常用一个神经网络表示,其参数为θ。从一个特定的状态State出发,一直到任务的结束,被称为一个完整的Episode。在每一步,我们都能获得一个奖励r,一个完整的任务所获得的最终奖励被称为R。

如果我们用Q函数来预估未来的累积奖励,同时创建一个Critic网络来计算Q函数值,那么我们就得到了Actor-Critic方法。

Q函数在A3C里的主要作用是增加一个基线,使得反馈有正有负,这里的基线通常用状态价值函数V来表示。但是,当我们应用这样的方法,则需要同时计算Q函数和V函数,这并不容易。Q函数可以用“Step t+1的V函数”加上“从Step t到Step t+1的r”来代替。这样,我们就可以得到用V来表示的Q值计算,我们一般称为Advantage(优势函数),此时的Critic网络变为计算优势函数A的网络。

A3C是Asynchronous Advantage Actor-Critic的缩写,中文翻译为异步的优势动作评价算法。其中,Advantage就是指优势函数A。因此,从名字这里我们可以解读出来A3C实质就是求解πθ网络和Aπ(s, a)网络。

在A3C算法论文中,论文作者对比了四种算法——异步Sarsa、异步Q-Learning、DQN和A3C。论文发表后,各路算法大神验证一个问题——是异步更新让算法表现优于其他算法?。结果非常有趣:多线程是A3C算法快的原因,但是”异步更新“反而是它的缺点。于是,科学家提出同步更新算法A2C(Advantage Actor-Critic),让它可以更有效利用CPU资源。

PS:算法大神照样被打脸,啪啪啪!


在下面部分,我会先对PARL库内置的A2C算法进行简单解读,这样大家在看项目实践部分时,就能少阅读一些代码。




Learner

这个类有意思的地方是,PARL库用了A3C的名字。原因是A2C和A3C是同源算法。它们实现上的主要区别是step函数(后面会讲到)。

env = gym.make(config['env_name'])
        env = wrap_deepmind(env, dim=config['env_dim'], obs_format='NCHW')
obs_shape = env.observation_space.shape
act_dim = env.action_space.n
self.config['obs_shape'] = obs_shape
self.config['act_dim'] = act_dim

model = AtariModel(act_dim)
algorithm = parl.algorithms.A3C(
    model, vf_loss_coeff=config['vf_loss_coeff'])
self.agent = AtariAgent(algorithm, config)




create_actors

这段代码有意思的地方是,它把自己连接到了XPARL集群,然后去执行run_remote_sample。阅读过DQN源码的同学应该很好理解,它的意思就是在独立进程运行“取样”。

def create_actors(self):
        # 先把自己连接到XPARL集群上去
        parl.connect(self.config['master_address'])
        for i in six.moves.range(self.config['actor_num']):
            ...
            remote_thread = threading.Thread(
                # 在工作线程中运行run_remote_sample函数
                # 通过params_queue传递模型的参数
                target=self.run_remote_sample, args=(params_queue, ))
            remote_thread.setDaemon(True)
            remote_thread.start()
        ...




step函数

step函数是A2C算法中最重要、独特的函数,作用是同步等待更新操作。因为A2C算法会同步等待所有Agent(Actor)完成一轮训练后,把π网络的参数θ同步上来,更新全局的π网络参数。




Actor函数

注解@parl.remote_class表明Actor类是在独立的本机进程中执行(因为A2C是利用本机多CPU)。通过两行命令部署了PARL分布式集群,Actor实际是在远程server中运行了。

注意,Actor的init方法中保存了env数组,用同样的参数实例化了模型,用同样的模型实例化了算法并作为参数传入到了Agent中

@parl.remote_class
class Actor(object):
    def __init__(self, config):
        ...
        # Actor保存了env数组
        self.envs = []
        for _ in range(config['env_num']):
            env = gym.make(config['env_name'])
            env = wrap_deepmind(env, dim=config['env_dim'], obs_format='NCHW')
            self.envs.append(env)
        ...
        model = AtariModel(act_dim)
        algorithm = parl.algorithms.A3C(
            model, vf_loss_coeff=config['vf_loss_coeff'])
        self.agent = AtariAgent(algorithm, config)

大家还要关注的点是,每个Actor对应一个Agent。




sample函数

Actor中的sample函数会调用Agent的sample函数和Agent的value函数来分别更新本地的π网络和v网络,最终返回sample_data给中心节点。

...

actions_batch, values_batch = self.agent.sample(np.stack(self.obs_batch))

...

next_value = self.agent.value(next_obs)

...

sample_data的数据结构:

sample_data['obs'].extend(env_sample_data[env_id]['obs'])
sample_data['actions'].extend(env_sample_data[env_id]['actions'])
sample_data['advantages'].extend(advantages)
sample_data['target_values'].extend(target_values)

其中,优势函数的的计算如下:

# gae:generalized advantage estimator
advantages = calc_gae(rewards, values, next_value,
                      self.config['gamma'],
                      self.config['lambda'])
target_values = advantages + values





VectorEnv函数

这个类是PARL对env环境的封装。我们的模拟真机环境,也采用了同样的定义,主要是为了同时跑多个环境,增加并行计算的效率,如下所示:

class VectorEnv(object):
    def __init__(self, envs):
    def reset(self):
        ...
    def step(self, actions):
            # env需要实现step方法
            obs, reward, done, info = self.envs[env_id].step(actions[env_id])
        ...
            if done:
                # env需要实现reset方法
                obs = self.envs[env_id].reset()
        ...
    return obs_batch, reward_batch, done_batch, info_batch

模拟器的源数据是由此类中的step方法批量返回。



实战编程







1.游戏模拟器编写&训练

新建《明日方舟》模拟器项目:
ArKnight_A2C_Simulator

因为《明日方舟》是手机网络游戏,数据生产速度实在太慢了!!!为了提高训练速度,需要自己开发模拟器。用模拟器后速度可提升50-100倍。

修改Learner的初始化方法:

#=========== Create Agent ==========
game = ArKnights()
env = PMGE(game)
obs_shape = (3108192)
act_dim = 650

定义新的env.py:

class PMGE(object):
    def __init__(self, game):
        self.game = game
    def step(self, action):
        # 模拟器简化了状态判断
        # 实际项目应该实时生成:当前屏幕--> stateCode 的关系
        s1 = [ self.game.stateCode ]
        # 产生状态变化
        self.game.act(action, s1)
        reward = self.game.getScore(s1)
        isOver = self.game.gameOver()
        next_obs = self.game.render()
        # 为了匹配标准的API
        return next_obs, reward, isOver, 0

    def reset(self):
        return self.game.reset()

修改Actor:

class Actor(object):
    def __init__(self, config):
        self.config = config
        self.envs = []
        for _ in range(config['env_num']):
            game = ArKnights()
            env = PMGE(game)
            self.envs.append(env)
        self.vector_env = VectorEnv(self.envs)
        self.obs_batch = self.vector_env.reset()
        model = Model(config['act_dim'])
        algorithm = parl.algorithms.A3C(
            model, vf_loss_coeff=config['vf_loss_coeff'])
        self.agent = Agent(algorithm, config)

定义训练用的模拟环境:

class ArKnights(object):
    def __init__(self):
        """
        游戏《明日方舟》智能体定义
        """

        self.stateCode = 990
        # 1920x1080 ----- 1920/80 x 1080/40 = 24x27
        self.tap_dim = 24*27
        self.swipe_dim = 4 # 上下左右

    def render(self):
        imgDir = IMAGE_DIR + str(self.stateCode) + '/'
        filenames = os.listdir(imgDir)
        # 在stateCode目录下随机取一张图片
        filename = random.choice(filenames)
        return self.transform_img(imgDir + filename)

    def act(self, action, stateCode):
        if stateCode[0] == 990:
            if action in [442,443,444,445,466,467,468,469]:
                self.stateCode = 970

        if stateCode[0] == 970:
            if action in [111,112,113,114,115,
                          135,136,137,138,139,
                          159,160,161,162,163,
                          183,184,185,186,187,
                          207,208,209,210,211]:
                self.stateCode = 965

    def getScore(self, s1):
        # 状态没变扣一分
        if s1[0] == self.stateCode:
            return -1
        return 1

    def gameOver(self):
        code = self.stateCode
        # if (code == 910 or code == 1010):
        # for debug 让算法快速收敛
        if (code == 965):
            return True
        return False

    def reset(self):
        self.stateCode = 990
        imgDir = IMAGE_DIR + str(self.stateCode) + '/'
        filenames = os.listdir(imgDir)
        # 在990目录下随机取一张图片
        filename = random.choice(filenames)
        return self.transform_img(imgDir + filename)

    def transform_img(self, filepath):
        # 直接读取 (h,w)
        img = cv2.imread(filepath, cv2.IMREAD_COLOR)
        # 将图片尺寸缩放道 (image, (w,h)) 192x108
        img = cv2.resize(img, (192108))
        # 因为cv2的数组长宽是反的,所以用numpy转置一下 (C,H,W)
        img = np.transpose(img, (201))
        obs = img.astype('float32')
        return obs     

在模拟器中经过大约10万个steps,模型的loss就收敛了。




2.编写状态推理引擎

新建项目ARKNIGHT_CLASSIFY,使用残差神经网络对《明日方舟》中的主要游戏界面做了预定义。利用这个引擎,在真机部署的时候可以推断出当前游戏的state,用于计算reward和game over这两个重要参数。




3.评估强化学习模型

在深度强化学习中,效果评估非常重要,因为我们要知道算法从数据中学到了什么?

我们在第一步中得到了模型,在第二步中得到了真机环境下的reward和game over函数。

那么我们就要在真机环境中去测试。

def test():
    game = ArKnights()
    env = PMGE(game)
    obs_shape = (3108192)
    act_dim = 650
    config['obs_shape'] = obs_shape
    config['act_dim'] = act_dim
    model = Model(act_dim)
    algorithm = parl.algorithms.A3C(model, vf_loss_coeff=config['vf_loss_coeff'])
    agent = Agent(algorithm, config)
    agent.restore("./model_dir")
    # 初始状态
    obs = env.reset()
    MAX_STEP = 20
    step = 0
    while True:
        state_code = env.game.stateCode
        action = agent.predict(obs)
        obs, reward, isOver, _ = env.step(action)
        next_state_code = env.game.stateCode
        step += 1
        logger.info("evaluate state_code:{}, action:{} next_state_code:{}, reward:{}, isOver:{}".format(state_code, action, next_state_code, reward, isOver))
        if isOver or step >MAX_STEP:
            logger.info("GameOver, state:{}".format(next_state_code))
            break;

可以看到,我只用了2步,算法就成功达到了设定的终止状态[965]。新建部署项目ArKnight_A2C,把模型导入,效果如下:




4.模型和状态推理引擎部署到真机

定义真机环境:

import time
import cv2
from PIL import Image
import numpy as np
from adbutil import AdbUtil
from resnet import ResNet
import paddle
import paddle.fluid as fluid

class ArKnights(object):
    def __init__(self):
        self.adbutil = AdbUtil()

        # 加载推理模型
        with fluid.dygraph.guard():
            # 加载状态推断引擎
            self.model = ResNet('resnet'50)
            #加载模型参数
            model_state_dict, _ = fluid.load_dygraph("arknights")
            self.model.load_dict(model_state_dict)
            self.model.eval()

    def _restart(self):
        """
        打开游戏进程
        如果已经打开,先关闭再重新打开
        """

        self.adbutil.stopArKnights()
        self.adbutil.startArKnights()
        # 每隔1秒在屏幕中心点击1下,持续20秒
        self.adbutil.taptap(960,540,20,1)

    def _stop(self):
        """
        关闭游戏进程
        """

        self.adbutil.stopArKnights()

    def act(self, action):
        # 点击动作code映射成动作
        if action < 648:
            x = (action % 24) * 80 + 40 # 取余
            y = (action // 24) * 40 + 20 # 取商
            self.adbutil.taptap(x,y,1,0.01# x,y,count,frequency
        elif action == 648:
            self.adbutil.rightswipeswipe(2,0.5)
        elif action == 649:      
            self.adbutil.leftswipeswipe(2,0.5)
        else:
            raise("No such action error!" + str(action))
        time.sleep(2# 等动作执行完

    def render(self):
        # TODO check shape
        img = self.adbutil.screencap()
        img = img.resize((192108), Image.ANTIALIAS)
        # 因为图片的数组长宽是反的,所以用numpy转置一下 (C,H,W)
        img = np.transpose(img, (201))
        obs = img.astype('float32')
        return obs

    def reset(self):
        self._restart()
        return self.render()

    def gameOver(self):
        state = self.inferState()
        print("state"+str(state))
        if state[0] == 965:
            return True
        else:
            return False

    def inferState(self):
        """
        图片推断
        """

        ...

这里的游戏状态推断引擎,就是ARKNIGHT_CLASSIFY项目输出的推理模型。有了状态的推理值,代码中的reward和game over就可以和真机环境匹配上。同时,用AdbUtil类来执行真实动作,就可以操作真机执行算法动作。最终真机运行效果如下(手机屏幕的变化请看视频):

在这个文章中,我给大家展示了如何构建明日方舟的交互环境,以及如何通过PARL快速调用A3C算法实现并行训练,整体实现起来简单易懂。

看到这儿,大家是不是迫不及待地想要自己动手尝试!

“英雄们”,快用飞桨去实现你们的美梦吧,yyds(永远滴神)!

欲知详情,请戳PARL开源链接:

https://github.com/PaddlePaddle/PARL



Rust引入Linux内核开发?Google:支持!

2021-04-16

Deno 1.9发布,新增原生HTTP/2 Web服务器

2021-04-16

历史性突破!龙芯重磅推出自主指令系统架构LoongArch

2021-04-15



觉得不错,请点个在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存